package com.wolfking.aggregate;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *
 * @author 赵伟伟(wolfking)
 * created on 2019-07-10 11:53
 */
@Slf4j
class ConcurrentAggregateProxy implements InvocationHandler {

    private final Map<Method, AggregateMethodConfig> methodConfigMap;

    private final Map<Method, Map<String, Integer>> methodParameterConfig;

    private final Map<Method, Boolean> ignoreExceptionConfig;

    private final Map<Method, ExecutorService> executorServiceMap;

    ConcurrentAggregateProxy(
            Map<Method, AggregateMethodConfig> methodConfig, Map<Method, ExecutorService> executorServiceMap,
            Map<Method, Map<String, Integer>> methodParameterConfig, Map<Method, Boolean> ignoreExceptionConfig) {
        this.methodConfigMap = methodConfig;
        this.executorServiceMap = executorServiceMap;
        this.methodParameterConfig = methodParameterConfig;
        this.ignoreExceptionConfig = ignoreExceptionConfig;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        /*object的方法*/
        if (method.getDeclaringClass().equals(Object.class)) {
            return method.invoke(this, args);
        } else if (method.isDefault()) {
            /*default的方法*/
            Constructor<MethodHandles.Lookup> constructor = MethodHandles.Lookup.class.getDeclaredConstructor(Class.class, int.class);
            constructor.setAccessible(true);
            Class<?> declaringClass = method.getDeclaringClass();
            int allModes = MethodHandles.Lookup.PUBLIC | MethodHandles.Lookup.PRIVATE | MethodHandles.Lookup.PROTECTED | MethodHandles.Lookup.PACKAGE;
            return constructor.newInstance(declaringClass, allModes)
                    .unreflectSpecial(method, declaringClass)
                    .bindTo(proxy)
                    .invokeWithArguments(args);
        } else if (methodConfigMap.containsKey(method)) {
            /*代理的方法*/
            return invokeMethod(method, args);
        } else {
            /*未知的方法*/
            throw new AggregateException("not suitable method " + method);
        }
    }

    private Object invokeMethod(Method method, Object[] args) throws Throwable {
        Map<String, Integer> paramIndexMap = methodParameterConfig.get(method);
        Boolean ignoreException = ignoreExceptionConfig.get(method);
        AggregateMethodConfig aggregateMethodConfig = methodConfigMap.get(method);
        List<AggregateProviderConfig> aggregateProviderConfigList = aggregateMethodConfig.getAggregateProviderConfigList();
        CountDownLatch stopDownLatch = new CountDownLatch(aggregateProviderConfigList.size());
        List<Future> futureList = Lists.newArrayList();
        List<Future> unUsedList = Lists.newArrayList();
        for (AggregateProviderConfig providerConfig : aggregateProviderConfigList) {
            String[] parameterNames = providerConfig.getParameterNames();
            List<Object> invokeArgs = new ArrayList<>(parameterNames.length);
            for (String parameterName : parameterNames) {
                invokeArgs.add(args[paramIndexMap.get(parameterName)]);
            }
            log.debug("submit the bean is {} method is {} args is {}", providerConfig.getBean(), providerConfig.getMethod(), invokeArgs);
            Future<Object> submit = executorServiceMap.get(method).submit(() -> {
                try {
                    return providerConfig.getMethod().invoke(providerConfig.getBean(), invokeArgs.toArray());
                } catch (Exception e) {
                    if (ignoreException || providerConfig.isIgnoreException()) {
                        log.warn("current ignore the exception", e);
                        return null;
                    } else {
                        throw e;
                    }
                } finally {
                    stopDownLatch.countDown();
                }
            });
            if (!providerConfig.isDiscardResult()) {
                futureList.add(submit);
            } else {
                unUsedList.add(submit);
            }
        }
        stopDownLatch.await(aggregateMethodConfig.getTimeout(), TimeUnit.MILLISECONDS);
        AggregateConsumerConfig aggregateConsumerConfig = aggregateMethodConfig.getAggregateConsumerConfig();
        List<Object> consumeArgs = new ArrayList<>(futureList.size());
        for (Future future : futureList) {
            consumeArgs.add(future.get(10000, TimeUnit.MILLISECONDS));
        }
        for (Future future : unUsedList) {
            future.get(10000, TimeUnit.MILLISECONDS);
        }
        if (aggregateConsumerConfig != null) {
            log.debug("consume the aggregate bean is {},method is {},args iss {}",
                    aggregateConsumerConfig.getBean(), aggregateConsumerConfig.getMethod(), consumeArgs.toArray());
            return aggregateConsumerConfig.getMethod().invoke(aggregateMethodConfig.getAggregateConsumerConfig().getBean(), consumeArgs.toArray());
        } else {
            log.info("current no consume config,return null");
            return null;
        }
    }
}
